# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import hashlib
import gzip
import numpy as np
import scipy as sp
import sympy as sm
from scipy import interpolate
from hysop.tools.io_utils import IO
from hysop.tools.numerics import mpq, mpfr, mpqize
from hysop.tools.cache import load_data_from_cache, update_cache
from hysop.numerics.odesolvers.runge_kutta import ExplicitRungeKutta
[docs]
class Kernel:
def __init__(self, register=False, verbose=False, split_polys=False, **kargs):
"""
Use SymmetricKernelGenerator or KernelGenerator to generate a kernel.
Do not call directly.
split_polys: Split each polynomial in two, to gain precision.
"""
dic = kargs
varnames = ["n", "r", "deg", "Ms", "Mh", "H", "remesh", "P"]
for var in varnames:
setattr(self, var, dic[var])
self.split_polys = split_polys
self.varnames = varnames
if register:
self._register(dic)
self._build(verbose, split_polys)
self._hashable = True
[docs]
@classmethod
def hash_kernel_key(cls, n, r, deg, Ms, H, remesh):
s = f"{n}_{r}_{deg}_{Ms}_{H}_{int(remesh)}"
return hashlib.sha256(s.encode("utf-8")).hexdigest()
[docs]
@classmethod
def cache_file(cls):
_cache_dir = IO.cache_path() + "/numerics"
_cache_file = _cache_dir + "/remesh.pklz"
return _cache_file
def __eq__(self, other):
if not isinstance(other, ExplicitRungeKutta):
return NotImplemented
eq = self.split_polys == other.split_polys
for var in self.varnames:
eq &= getattr(self, var) == getattr(other, var)
return eq
def __ne__(self, other):
eq = self == other
if isinstance(eq, ExplicitRungeKutta):
return NotImplemented
return not eq
def __hash__(self):
h = self.hash_kernel_key(self.n, self.r, self.deg, self.Ms, self.H, self.remesh)
return hash((h, self.split_polys))
def _build(self, verbose, split_polys):
# polynom symbolic variables
x = sm.Symbol("x")
t = sm.Symbol("t")
# usefull vars
Ms = self.Ms
deg = self.deg
P = self.P
self.Px = P
if verbose:
print(" => Substitution in Polynomials")
for Pix in P:
print(" ", Pix.all_coeffs()[::-1])
print()
for Pix in P:
print(" ", sm.horner(Pix))
print()
# split polynomials
X = np.arange(-Ms, +Ms + 1)
if split_polys:
Pt_l = []
Pt_r = []
Pt_L = []
Pt_R = []
Cl = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64)
Cr = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64)
CL = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64)
CR = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64)
for i, Pix in enumerate(P):
Pit_l = sm.polys.polytools.poly(
Pix.as_expr().xreplace({x: t + X[i]}), t, domain="QQ"
)
Pit_r = sm.polys.polytools.poly(
Pix.as_expr().xreplace({x: X[i] + 1 - t}), t, domain="QQ"
)
Pit_L = sm.polys.polytools.poly(
Pix.as_expr().xreplace({x: +1 - t + X[i]}), t, domain="QQ"
)
Pit_R = sm.polys.polytools.poly(
Pix.as_expr().xreplace({x: X[i] - t}), t, domain="QQ"
)
Cl[:, i] = np.asarray(Pit_l.all_coeffs(), dtype=np.float64)
Cr[:, i] = np.asarray(Pit_r.all_coeffs(), dtype=np.float64)
CL[:, i] = np.asarray(Pit_L.all_coeffs(), dtype=np.float64)
CR[:, i] = np.asarray(Pit_R.all_coeffs(), dtype=np.float64)
Pt_l.append(Pit_l)
Pt_r.append(Pit_r)
Pt_L.append(Pit_L)
Pt_R.append(Pit_R)
self.Cl = Cl
self.Cr = Cr
self.CL = CL
self.CR = CR
self.Pt_l = Pt_l
self.Pt_r = Pt_r
self.Pt_L = Pt_L
self.Pt_R = Pt_R
if verbose:
print(" => Splitting polynomials")
for p in Pt_l:
print(" ", p.all_coeffs())
print()
for p in Pt_r:
print(" ", p.all_coeffs())
print()
print()
for p in Pt_l:
print(" ", sm.horner(p))
print()
for p in Pt_r:
print(" ", sm.horner(p))
else:
Pt = []
Pt_L = []
C = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64)
CL = np.empty(shape=(deg + 1, 2 * Ms), dtype=np.float64)
for i, Pix in enumerate(P):
Pit = sm.polys.polytools.poly(
Pix.as_expr().xreplace({x: t + X[i]}), t, domain="QQ"
)
Pit_L = sm.polys.polytools.poly(
Pix.as_expr().xreplace({x: 1 - t + X[i]}), t, domain="QQ"
)
C[:, i] = np.asarray(Pit.all_coeffs(), dtype=np.float64)
CL[:, i] = np.asarray(Pit_L.all_coeffs(), dtype=np.float64)
Pt.append(Pit)
Pt_L.append(Pit_L)
self.Pt = Pt
self.Pt_L = Pt_L
self.C = C
self.CL = CL
if verbose:
print(" => Substituing x = t+i")
for Pit in Pt:
print(" ", Pit.all_coeffs()[::-1])
print()
for Pit in Pt:
print(" ", sm.horner(Pit))
print()
if verbose:
print(" => Generating lambdas")
imin = -Ms
imax = +Ms
if split_polys:
gamma_l = sp.interpolate.PPoly(Cl, X, extrapolate=False)
gamma_r = sp.interpolate.PPoly(Cr, X, extrapolate=False)
def gamma(x):
i = np.floor(x)
z = x - i
res = (z >= 0.5) * gamma_r(i + 1 - z) + (z < 0.5) * gamma_l(x)
res[np.isnan(res)] = 0.0
return res
else:
gamma_ = sp.interpolate.PPoly(C, X, extrapolate=False)
def gamma(x):
res = gamma_(x)
res[np.isnan(res)] = 0.0
return res
self.I = X
self.gamma = gamma
if split_polys:
self.gamma_l = gamma_l
self.gamma_r = gamma_r
self.poly_splitted = True
else:
self.poly_splitted = False
if verbose:
print(" All done.")
def _register(self, dic):
key = self.hash_kernel_key(
self.n, self.r, self.deg, self.Ms, self.H, self.remesh
)
update_cache(self.cache_file(), key, dic)
def __call__(self, x):
Ms = self.Ms
res = self.gamma(x)
return res
[docs]
class SymmetricKernelGenerator:
"""
Generate a symmetric piecewise polynomial that
preserves the first discrete moments of a given symmetric stencil H.
If H is None, an interpolation stencil is generated.
"""
SINGULAR_SYSTEM = {}
def __init__(self, verbose=False):
self.verbose = verbose
self.configured = False
[docs]
def solve(self, r, override_cache=False, split_polys=False, no_wrap=False):
assert self.configured, "SymmetricKernelGenerator has not been configured."
assert r >= 0, "r<0."
deg = 2 * r + 1
n = self.n
Ms = self.Ms
H = self.H
Mh = self.Mh
remesh = self.remesh
verbose = self.verbose
if no_wrap:
cls = dict
else:
cls = Kernel
if verbose:
print(
"\nSymmetricKernelGenerator was called with the following parameters:"
)
print(f" n = {n} (preserved discrete moments)")
print(f" r = {r} (overall kernel regularity)")
print(f" Ms = {Ms} (piecewise polynomial count)")
print(f" deg = {deg} (local polynomial regularity)")
print()
print(" H = [" + ",".join([h.__str__() for h in H]) + "]")
if not self.remesh:
print(" Mh = [" + ",".join([m.__str__() for m in Mh]) + "]")
print()
# NG 28 august 2024: H = mpqize(np.asarray(H))
# This solution is ok for Interger values, but not for Real values.
# Bug or restriction concerning sm.Rational(f2q()).
# micromamba gmpy2 is an old version 2.1.5 in august 2024 (latest=2.2.1)
H = mpqize(np.asarray(H))
if not self.remesh:
Mh = mpqize(np.asarray(Mh))
# check if ke rnel was not already cached
key = Kernel.hash_kernel_key(n, r, deg, Ms, H, remesh)
if override_cache:
if verbose:
print(" Cache overwrite requested.")
else:
if verbose:
print(" Checking if kernel was already computed... ", end=" ")
data = load_data_from_cache(Kernel.cache_file(), key)
if data is not None:
if verbose:
print("True")
print(" Loading kernel from cache.")
if data == self.SINGULAR_SYSTEM:
raise RuntimeError("Could not solve linear system.")
kernel = cls(
verbose=verbose, register=False, split_polys=split_polys, **data
)
return kernel
elif verbose:
print("False")
print(" Building linear system...")
# polynom symbolic variable
x = sm.Symbol("x")
# build Ms*(deg+1) symbolic coefficients (polynomial unknowns)
coeffs = []
for k in range(Ms):
coeffs.append([sm.symbols(f"C{k}_{d}") for d in range(deg + 1)])
# build discrete moments rhs values
M = []
for i in range(n + 1):
Mi = []
for j in range(deg + 1):
if remesh and i == j:
Mij = sm.Rational(1)
elif not remesh and i - j >= 0 and Mh[i - j] != 0:
Mij = sm.symbols(f"M{i}_{j}")
else:
Mij = 0
Mi.append(Mij)
M.append(Mi)
# build the Ms polynomials
Pp = []
Pm = []
for i, C in enumerate(coeffs):
pexpr = sm.Integer(0) # NG 28 aug 2024: replace f2q by sm.Integer
mexpr = sm.Integer(0) # NG 28 aug 2024: replace f2q by sm.Integer
for d, c in enumerate(C):
pexpr += c * (x**d)
mexpr += c * ((-x) ** d)
Pp.append(sm.polys.polytools.poly(pexpr, x))
Pm.append(sm.polys.polytools.poly(mexpr, x))
P = Pm[::-1] + Pp
# precompute the r first polynomial derivatives
dPs = []
for p in Pp:
dP = [p]
for i in range(r):
p = p.diff()
dP.append(p)
dPs.append(dP)
# Build overdetermined system of equations
eqs = []
# Continuity equations (Gamma is Cr continuous)
# Parity in x=0 -- Gamma is EVEN -> (r+1)//2 eqs
for d in range(1, r + 1, 2):
eq = coeffs[0][d] # =0
eqs.append(eq)
# Right-most point, zero derivatives -> (r+1) eqs
for d in range(r + 1):
eq = dPs[-1][d].xreplace({x: sm.Integer(Ms)}) # =0
eqs.append(eq.as_expr())
# Cr-continuity on inner points -> (Ms-1)*(r+1) eqs
for d in range(r + 1):
for i in range(Ms - 1):
eq = dPs[i][d].xreplace({x: sm.Integer(i + 1)}) - \
dPs[i + 1][d].xreplace({x: sm.Integer(i + 1)}) # = 0
eqs.append(eq.as_expr())
# Interpolation condition on the left -> Ms equations
for i in range(Ms):
# NG 3 sep 2024: eq = Pp[i].xreplace({x: sm.Integer(i)}) - H[Ms + i]
eq = Pp[i].xreplace({x: sm.Integer(i)}) - \
sm.Rational(sm.Integer(H[Ms + i].numerator), \
sm.Integer(H[Ms + i].denominator))
eqs.append(eq.as_expr())
# Discrete moments
s = sm.symbols("s")
for m in range(0, n + 1):
expr = sm.Integer(0) # NG 28 aug 2024: replace f2q by sm.Integer
for l in range(-Ms + 1, Ms + 1):
if m > 0 and l == 0:
continue
i = Ms - l
# NG 28 august 2024 => e = P[i].xreplace({x: s - f2q(l)}).as_expr()
e = P[i].xreplace({x: s - sm.Integer(l)}).as_expr()
if m > 0:
# NG 28 august 2024 => e *= f2q(l**m)
e *= sm.Rational(l**m)
expr += e
Pm = sm.polys.polytools.poly(expr, s)
for i, Cmi in enumerate(Pm.all_coeffs()[::-1]):
# NG 28 august 2024 => eqs.append( Cmi - sm.Rational(M[m][i]) )
eqs.append( Cmi - sm.Rational(M[m][i]) )
if verbose:
print(" => System built.")
unknowns = [c for cl in coeffs for c in cl] + [
m for ml in M for m in ml if isinstance(m, sm.Symbol)
]
if verbose:
print(" Unknowns: ", unknowns)
sol = sm.solve(eqs, unknowns)
if len(sol) != len(unknowns):
if verbose:
print("sol=", sol)
update_cache(Kernel.cache_file(), key, self.SINGULAR_SYSTEM)
raise RuntimeError("Could not solve linear system.")
elif verbose:
print(" => System solved.")
for k in sorted(sol.keys(), key=lambda x: x.name):
print(f" {k}: {sol[k]}")
for i, Pix in enumerate(P):
P[i] = Pix.xreplace(sol)
# 28 august 2024: change H=mpqize(np.asarray(H)) => H = np.asarray(H)
# micromamba gmpy2 version is to old !
assert P[i].domain==sm.QQ,"Polynoms are not rational!!!"
kernel = cls(
n=n,
r=r,
deg=deg,
Ms=Ms,
H=mpqize(H),
Mh=Mh,
remesh=remesh,
P=P,
register=True,
verbose=verbose,
split_polys=split_polys,
)
return kernel
if __name__ == "__main__":
from hysop.numerics.stencil.stencil_generator import StencilGenerator
from matplotlib import pyplot as plt
verbose = True
sg = StencilGenerator()
sg.configure(dim=1, derivative=2)
for i in range(1, 5):
p = 2 * i
H = sg.generate_exact_stencil(order=p - 1, origin=i)
print(H.coeffs)
assert H.is_centered()
H = [0] + H.coeffs.tolist() + [0]
kg = SymmetricKernelGenerator(verbose).configure(p, H=H)
kernels = []
for r in [1, 2, 4, 8]:
try:
kernels.append(kg.solve(r, override_cache=False))
except RuntimeError:
print(f"Solver failed fro p={p} and r={r}.")
if len(kernels) == 0:
continue
continue
k0 = kernels[0]
fig = plt.figure()
plt.xlabel(r"$x$")
plt.ylabel(r"$\Lambda_{" + "{},{}".format(p, "r") + "}$")
X = np.linspace(-k0.Ms - 1, +k0.Ms + 1, 1000)
s = plt.subplot(1, 1, 1, label=i)
for i, k in enumerate(kernels):
s.plot(X, k(X), label=r"$\Lambda_{" + f"{p},{k.r}" + "}$")
s.plot(k0.I, k0.H, "or")
axe_scaling = 0.10
ylim = s.get_ylim()
Ly = ylim[1] - ylim[0]
s.set_ylim(ylim[0] - axe_scaling * Ly, ylim[1] + axe_scaling * Ly)
s.legend()
plt.show(block=True)
# plt.savefig('out/gamma_{}_r'.format(p))